1.掌握hdfs的java操作 2.理解namenode与datanode的工作机制
1.hdfs的java操作 2.hdfs的组成部分详解
本课程的开发环境基于windows操作系统来配置的,使用的HDFS版本是hadoop2.7.1.
下载winutils的windows版本https://github.com/SweetInk/hadoop-common-2.7.1-bin
配置环境变量
第一步
第二步
第三步
压缩包(hadoop-common-2.7.1-bin)里的hadoop.dll,并拷贝到c:\windows\system32目录中。
在eclipse环境中创建一个maven项目,并引入依赖。
xxxxxxxxxx
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.7.1</version>
</dependency>
hadoop中关于文件操作类基本上全部是在org.apache.hadoop.fs包中,这些api能够支持的操作包含:打开文件,读写文件,删除文件等。
FileSystem,该类是个抽象类,只能通过来类的get方法得到具体类。get方法存在几个重载版本,常用的是这个:
static FileSystem get(Configuration conf);
ximport java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
publicclass HadoopFSOperations {
public static void main(String[] args) throws Exception {
// createNewHDFSFile("/tmp/create2.c", "hello");
// System.out.println(readHDFSFile("/tmp/copy.c").toString());
// mkdir("/tmp/testdir");
// deleteDir("/tmp/testdir");
listAll("/tmp/");
}
/*
* upload the local file to the hds notice that the path is full like
* /tmp/test.c
*/
public static void uploadLocalFile2HDFS(String s, String d) throws IOException {
Configuration config = new Configuration();
FileSystem hdfs = FileSystem.get(config);
Path src = new Path(s);
Path dst = new Path(d);
hdfs.copyFromLocalFile(src, dst);
hdfs.close();
}
/*
* create a new file in the hdfs. notice that the toCreateFilePath is the
* full path and write the content to the hdfs file.
*/
public static void createNewHDFSFile(String toCreateFilePath, String content) throws IOException {
Configuration config = new Configuration();
FileSystem hdfs = FileSystem.get(config);
FSDataOutputStream os = hdfs.create(new Path(toCreateFilePath));
os.write(content.getBytes("UTF-8"));
os.close();
hdfs.close();
}
/*
* delete the hdfs file notice that the dst is the full path name
*/
public static boolean deleteHDFSFile(String dst) throws IOException {
Configuration config = new Configuration();
FileSystem hdfs = FileSystem.get(config);
Path path = new Path(dst);
booleanisDeleted = hdfs.delete(path);
hdfs.close();
returnisDeleted;
}
/*
* read the hdfs file content notice that the dst is the full path name
*/
public static byte[] readHDFSFile(String dst) throws Exception {
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(conf);
// check if the file exists
Path path = new Path(dst);
if (fs.exists(path)) {
FSDataInputStream is = fs.open(path);
// get the file info to create the buffer
FileStatus stat = fs.getFileStatus(path);
// create the buffer
byte[] buffer = newbyte[Integer.parseInt(String.valueOf(stat.getLen()))];
is.readFully(0, buffer);
// 多次读取
// int length = 0;
// while ((length = is.read(buffer, 0, 128)) != -1) {
// System.out.println(new String(buffer, 0, length));
// }
is.close();
fs.close();
returnbuffer;
} else {
thrownew Exception("the file is not found .");
}
}
/*
* make a new dir in the hdfs
* the dir may like '/tmp/testdir'
*/
public static void mkdir(String dir) throws IOException {
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(conf);
fs.mkdirs(new Path(dir));
fs.close();
}
/*
* delete a dir in the hdfs
* dir may like '/tmp/testdir'
*/
public static void deleteDir(String dir) throws IOException {
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(conf);
fs.delete(new Path(dir));
fs.close();
}
public static void listAll(String dir) throws IOException {
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(conf);
FileStatus[] stats = fs.listStatus(new Path(dir));
for (inti = 0; i<stats.length; ++i) {
if (stats[i].isFile()) {
// regular file
System.out.println(stats[i].getPath().toString());
} elseif (stats[i].isDirectory()) {
// dir
System.out.println(stats[i].getPath().toString());
} elseif (stats[i].isSymlink()) {
// is s symlink in linux
System.out.println(stats[i].getPath().toString());
}
}
fs.close();
}
}
我们知道,首先对于任何文件系统都是与当前环境变量紧密联系一起,对于当前 HDFS来说,在创建出当前文件系统实例之前,有必要获得当前的环境变量。代码见下:
Configuration conf = new Configuration();
Configuration 类为用户提供了对当前环境变量的一个实例,其中封装了当前搭载环境的配置,这配置是在我们由 core-site.xml 设置,一般返回值默认的本地系统文件。
而对于 HDFS 为我们提供的 FileSystem,更进一步为我们提供了一套加载当前环境并建立读写路径的 API,使用的方法如下所示:
public static FileSystem get(Configuration conf) throws IOException
public static FileSystem get(URI uri, Configuration conf) throws IOException
第一个方法使用默认的 URI 地址获取当前对象中环境变量加载的文件系统,第二个方法使用传入的 URI 获取路径指定的文件系统。
我们在对 FSDataInputStream 进行分析之前,将上面例子中代码替换为如下所示:
InputStream fis = fs.open(inPath);
程序依旧可以正常运行。
xxxxxxxxxx
public class FSDataInputStream extends DataInputStream implements Seekable, PositionedReadable {
publicsynchronizedvoid seek(longdesired) throws IOException {
((Seekable) in).seek(desired);
}
publicint read(longposition, byte[] buffer, intoffset, intlength) throws IOException {
return ((PositionedReadable) in).read(position, buffer, offset, length);
}
publicvoid readFully(longposition, byte[] buffer, intoffset, intlength) throws IOException{
((PositionedReadable) in).readFully(position, buffer, offset, length);
}
publicvoid readFully(longposition, byte[] buffer) throws IOException {
((PositionedReadable) in).readFully(position, buffer, 0, buffer.length);
}
}
xxxxxxxxxx
publicclass FSDSample {
publicstaticvoid main(String[] args) throws Exception {
Configuration conf = new Configuration(); // 获取环境变量
FileSystem fs = FileSystem.get(conf); // 获取文件系统
FSDataInputStream fsin = fs.open(new Path("sample.txt")); // 建立输入流
byte[] buff = newbyte[128]; // 建立辅助字节数组
intlength = 0;
while ((length = fsin.read(buff, 0, 128)) != -1) { // 将数据读入缓存数组
System.out.println(new String(buff, 0, length)); // 打印数据
}
System.out.println("length = " + fsin.getPos()); // 打印输入流的长度
fsin.seek(0); // 返回开始处
while ((length = fsin.read(buff, 0, 128)) != -1) { // 将数据读入缓存数组
System.out.println(new String(buff, 0, length)); // 打印数据
}
fsin.seek(0); // 返回开始处
byte[] buff2 = newbyte[128]; // 建立辅助字节数组
fsin.read(buff2, 0, 128);
System.out.println("buff2 =" + new String(buff2));
System.out.println(buff2.length);
}
}
上述代码是重复读取指定 HDFS 中文件的内容,第一次读取结束后,调用 seek(0)方法从而返回文件开始处重新进行数据读取。
xxxxxxxxxx
public FSDataOutputStream create(Path f) throws IOException {
return create(f, true);
}
public FSDataOutputStream create(Path f, booleanoverwrite) throws IOException {
return create(f, overwrite, getConf().getInt("io.file.buffer.size", 4096), getDefaultReplication(),
getDefaultBlockSize());
}
FSDataOutputStream 也是继承自 OutoutStream 的一个子类,专用为 FileSystem 提供文件的输出流。然后可以使用 OutoutStream 中的 write 方法对字节数组进行写操作。
xxxxxxxxxx
public class FSWriteSample {
public static void main(String[] args) throws Exception {
Path path = new Path("writeSample.txt"); // 创建写路径
Configuration conf = new Configuration(); // 获取环境变量
FileSystem fs = FileSystem.get(conf); // 获取文件系统
FSDataOutputStream fsout = fs.create(path); // 创建输出流
byte[] buff = "hello world".getBytes(); // 设置输出字节数组
fsout.write(buff); // 开始写出数组
IOUtils.closeStream(fsout); // 关闭写出流
}
}
public FSDataOutputStream create(Path f, Progressable progress) throws IOException {
}
有一个是Progressable progress 的形参, Progressable 接口如下所示:
xxxxxxxxxx
publicinterface Progressable {
public void progress(); // 调用 progress 方法
}
Progressable 接口中只有一个 progress 方法,每次在 64K 的文件写入既定的输入流以后,调用一次 progress 方法,这给我们提供了很好一次机会,可以将输出进度显示,代码如下:
xxxxxxxxxx
public class FSWriteSample2 {
static int index = 0;
public static void main(String[] args) throws Exception {
StringBuffer sb = new StringBuffer(); // 创建辅助可变字符串
Random rand = new Random();
for (inti = 0; i< 9999999; i++) { // 随机写入字符
sb.append((char) rand.nextInt(100));
}
byte[] buff = sb.toString().getBytes(); // 生成字符数组
Path path = new Path("writeSample.txt"); // 创建路径
Configuration conf = new Configuration(); // 获取环境变量
FileSystem fs = FileSystem.get(conf); // 获取文件系统
FSDataOutputStream fsout = fs.create(path, new Progressable() { // 创建写出流
publicvoid progress() { // 默认的实用方法
System.out.println(++index); // 打印出序列号
}
});
fsout.write(buff); // 开始写出操作
IOUtils.closeStream(fsout); // 关闭写出流
}
}
注意: FSDataOutputStream 与 FSDataInputStream 类似,也有 getPos 方法,返回是文件内以读取的长度。但是不同之处在于, FSDataOutputStream 不能够使用 seek 方法对文件重新定位。
NameNode启动的时候首先将fsimage(镜像)载入内存,并执行(replay)编辑日志editlog的的各项操作
一旦在内存中建立文件系统元数据映射,则创建一个新的fsimage文件(这个过程不需SecondaryNameNode) 和一个空的editlog
在安全模式下,各个datanode会向namenode发送块列表的最新情况
此刻namenode运行在安全模式。即NameNode的文件系统对于客户端来说是只读的。(显示目录,显示文件内容等。写、删除、重命名都会失败)
NameNode开始监听RPC和HTTP请求
解释RPC:RPC(Remote Procedure Call Protocol)——远程过程通过协议,它是一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络技术的协议
系统中数据块的位置并不是由namenode维护的,而是以块列表形式存储在datanode中
在系统的正常操作期间,namenode会在内存中保留所有块信息的映射信息
NameNode两个重要文件
元数据镜像
定期合并fsimage与edits
安全模式下,集群属于只读状态。但是严格来说,只是保证HDFS元数据信息的访问,而不保证文件的访问,因为文件的组成Block信息此时NameNode还不一定已经知道了。所以只有NameNode已了解了Block信息的文件才能独到。而安全模式下任何对HDFS有更新的操作都会失败。
对于全新创建的HDFS集群,NameNode启动后不会进入安全模式,因为没有Block信息。
安全模式相关命令
查询当前是否安全模式
xxxxxxxxxx
hadoop dfsadmin -safemode get
Safe mode is ON
等待safemode关闭,以便后续操作
xxxxxxxxxx
hadoop dfsadmin -safemode wait
退出安全模式
xxxxxxxxxx
hadoop dfsadmin -safemode leave
设置启用safemode
xxxxxxxxxx
hadoop dfsadmin -safemode enter
存储管理用户的文件块数据定期向namenode汇报自身所持有的block信息(通过心跳信息上报汇报的目的是,namenode如果长时间接收不到一个datanode的心跳后,说明该datanode宕机了,该datanode的数据块就被namdenode拿不到了,需要从其他机上拿该datanode的数据块副本。
datanode进程死亡或者网络故障造成datanode无法与namenode通信,namenode不会立即把该节点判定为死亡,要经过一段时间,这段时间暂称作超时时长。HDFS默认的超时时长为10分钟+30秒。如果定义超时时间为timeout,则超时时长的计算公式为:
timeout = 2 * heartbeat.recheck.interval + 10 * dfs.heartbeat.interval。
而默认的heartbeat.recheck.interval 大小为5分钟,dfs.heartbeat.interval默认为3秒。
需要注意的是hdfs-site.xml 配置文件中的heartbeat.recheck.interval的单位为毫秒,dfs.heartbeat.interval的单位为秒。所以,举个例子,如果heartbeat.recheck.interval设置为5000(毫秒),dfs.heartbeat.interval设置为3(秒,默认),则总的超时时间为40秒。